package com.lcy.util;


import com.lcy.bean.PageC;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class FlinkToClickHouse extends RichSinkFunction<PageC> {
    private Connection conn;
    private PreparedStatement ps ;
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
        conn = DriverManager.getConnection("jdbc:clickhouse://hadoop-single:8123", "default", "");
        ps =  conn.prepareStatement("insert into week3 values (?,?)");
    }
    @Override
    public void invoke(PageC value, Context context) throws Exception {
        ps.setString(1,value.getOs_name());
        ps.setLong(2,value.getNum());
        ps.execute();
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}
